| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620 |
2x
2x
2x
2x
2x
2x
2x
2x
2x
2x
2x
2x
2x
2x
2x
2x
2x
2x
2x
2x
426x
426x
426x
426x
2x
2x
591x
591x
591x
1396x
591x
591x
591x
591x
591x
591x
591x
591x
591x
591x
2x
591x
591x
591x
591x
591x
426x
426x
426x
426x
426x
426x
426x
426x
426x
426x
426x
426x
426x
426x
426x
426x
426x
270x
270x
270x
270x
270x
270x
270x
270x
586x
586x
586x
586x
586x
586x
2x
12x
192x
322x
192x
192x
192x
192x
192x
190x
14x
176x
10x
2x
192x
166x
94x
72x
10x
62x
653x
653x
653x
567x
567x
20x
653x
653x
2x
710x
710x
515x
515x
515x
10x
710x
6x
6x
6x
6x
2x
2x
2x
2x
2x
2x
4x
4x
4x
4x
4x
494x
494x
494x
494x
494x
56x
56x
56x
56x
56x
2x
586x
586x
276x
586x
586x
2x
550x
550x
540x
540x
540x
540x
56x
484x
540x
540x
2x
274x
274x
274x
274x
2x
1105x
56x
28x
28x
28x
28x
28x
1105x
2x
28x
28x
28x
28x
28x
28x
28x
28x
1379x
1379x
1379x
28x
28x
2x
26x
26x
26x
2x
1248x
1807x
1807x
1807x
1807x
1807x
1105x
1105x
1105x
1095x
10x
10x
1105x
1105x
1105x
1105x
780x
780x
780x
1807x
1807x
1807x
1807x
2x
2491x
18x
18x
18x
18x
18x
2x
| /**
* Copyright 2017 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { User } from '../auth/user';
import { EagerGarbageCollector } from '../local/eager_garbage_collector';
import { LocalStore } from '../local/local_store';
import { LocalViewChanges } from '../local/local_view_changes';
import { QueryData, QueryPurpose } from '../local/query_data';
import { ReferenceSet } from '../local/reference_set';
import { MaybeDocumentMap } from '../model/collections';
import { MaybeDocument, NoDocument } from '../model/document';
import { DocumentKey } from '../model/document_key';
import { Mutation } from '../model/mutation';
import { MutationBatchResult } from '../model/mutation_batch';
import { CurrentStatusUpdate, RemoteEvent } from '../remote/remote_event';
import { RemoteStore } from '../remote/remote_store';
import { RemoteSyncer } from '../remote/remote_syncer';
import { assert, fail } from '../util/assert';
import { FirestoreError } from '../util/error';
import * as log from '../util/log';
import { AnyJs, primitiveComparator } from '../util/misc';
import * as objUtils from '../util/obj';
import { ObjectMap } from '../util/obj_map';
import { Deferred } from '../util/promise';
import { SortedMap } from '../util/sorted_map';
import { isNullOrUndefined } from '../util/types';
import { Query } from './query';
import { SnapshotVersion } from './snapshot_version';
import { TargetIdGenerator } from './target_id_generator';
import { Transaction } from './transaction';
import { BatchId, OnlineState, ProtoByteString, TargetId } from './types';
import {
AddedLimboDocument,
LimboDocumentChange,
RemovedLimboDocument,
View,
ViewDocumentChanges
} from './view';
import { ViewSnapshot } from './view_snapshot';
const LOG_TAG = 'SyncEngine';
export type ViewHandler = (viewSnaps: ViewSnapshot[]) => void;
export type ErrorHandler = (query: Query, error: Error) => void;
/**
* QueryView contains all of the data that SyncEngine needs to keep track of for
* a particular query.
*/
class QueryView {
constructor(
/**
* The query itself.
*/
public query: Query,
/**
* The target number created by the client that is used in the watch
* stream to identify this query.
*/
public targetId: TargetId,
/**
* An identifier from the datastore backend that indicates the last state
* of the results that was received. This can be used to indicate where
* to continue receiving new doc changes for the query.
*/
public resumeToken: ProtoByteString,
/**
* The view is responsible for computing the final merged truth of what
* docs are in the query. It gets notified of local and remote changes,
* and applies the query filters and limits to determine the most correct
* possible results.
*/
public view: View
) {}
}
/**
* SyncEngine is the central controller in the client SDK architecture. It is
* the glue code between the EventManager, LocalStore, and RemoteStore. Some of
* SyncEngine's responsibilities include:
* 1. Coordinating client requests and remote events between the EventManager
* and the local and remote data stores.
* 2. Managing a View object for each query, providing the unified view between
* the local and remote data stores.
* 3. Notifying the RemoteStore when the LocalStore has new mutations in its
* queue that need sending to the backend.
*
* The SyncEngine’s methods should only ever be called by methods running in the
* global async queue.
*/
export class SyncEngine implements RemoteSyncer {
private viewHandler: ViewHandler | null = null;
private errorHandler: ErrorHandler | null = null;
private queryViewsByQuery = new ObjectMap<Query, QueryView>(q =>
q.canonicalId()
);
private queryViewsByTarget: { [targetId: number]: QueryView } = {};
private limboTargetsByKey = new SortedMap<DocumentKey, TargetId>(
DocumentKey.comparator
);
private limboKeysByTarget: { [targetId: number]: DocumentKey } = {};
private limboDocumentRefs = new ReferenceSet();
private limboCollector = new EagerGarbageCollector();
/** Stores user completion handlers, indexed by User and BatchId. */
private mutationUserCallbacks = {} as {
[uidKey: string]: SortedMap<BatchId, Deferred<void>>;
};
private targetIdGenerator = TargetIdGenerator.forSyncEngine();
constructor(
private localStore: LocalStore,
private remoteStore: RemoteStore,
private currentUser: User
) {}
/** Subscribes view and error handler. Can be called only once. */
subscribe(viewHandler: ViewHandler, errorHandler: ErrorHandler): void {
assert(
viewHandler !== null && errorHandler !== null,
'View and error handlers cannot be null'
);
assert(
this.viewHandler === null && this.errorHandler === null,
'SyncEngine already has a subscriber.'
);
this.viewHandler = viewHandler;
this.errorHandler = errorHandler;
this.limboCollector.addGarbageSource(this.limboDocumentRefs);
}
/**
* Initiates the new listen, resolves promise when listen enqueued to the
* server. All the subsequent view snapshots or errors are sent to the
* subscribed handlers. Returns the targetId of the query.
*/
listen(query: Query): Promise<TargetId> {
this.assertSubscribed('listen()');
assert(
!this.queryViewsByQuery.has(query),
'We already listen to the query: ' + query
);
return this.localStore.allocateQuery(query).then(queryData => {
return this.localStore
.executeQuery(query)
.then(docs => {
return this.localStore
.remoteDocumentKeys(queryData.targetId)
.then(remoteKeys => {
const view = new View(query, remoteKeys);
const viewDocChanges = view.computeDocChanges(docs);
const viewChange = view.applyChanges(viewDocChanges);
assert(
viewChange.limboChanges.length === 0,
'View returned limbo docs before target ack from the server.'
);
assert(
!!viewChange.snapshot,
'applyChanges for new view should always return a snapshot'
);
const data = new QueryView(
query,
queryData.targetId,
queryData.resumeToken,
view
);
this.queryViewsByQuery.set(query, data);
this.queryViewsByTarget[queryData.targetId] = data;
this.viewHandler!([viewChange.snapshot!]);
this.remoteStore.listen(queryData);
});
})
.then(() => {
return queryData.targetId;
});
});
}
/** Stops listening to the query. */
unlisten(query: Query): Promise<void> {
this.assertSubscribed('unlisten()');
const queryView = this.queryViewsByQuery.get(query)!;
assert(!!queryView, 'Trying to unlisten on query not found:' + query);
return this.localStore.releaseQuery(query).then(() => {
this.remoteStore.unlisten(queryView.targetId);
return this.removeAndCleanupQuery(queryView).then(() => {
return this.localStore.collectGarbage();
});
});
}
/**
* Initiates the write of local mutation batch which involves adding the
* writes to the mutation queue, notifying the remote store about new
* mutations and raising events for any changes this write caused.
*
* The promise returned by this call is resolved when the above steps
* have completed, *not* when the write was acked by the backend. The
* userCallback is resolved once the write was acked/rejected by the
* backend (or failed locally for any other reason).
*/
write(batch: Mutation[], userCallback: Deferred<void>): Promise<void> {
this.assertSubscribed('write()');
return this.localStore
.localWrite(batch)
.then(result => {
this.addMutationCallback(result.batchId, userCallback);
return this.emitNewSnapsAndNotifyLocalStore(result.changes);
})
.then(() => {
return this.remoteStore.fillWritePipeline();
});
}
// TODO(klimt): Wrap the given error in a standard Firestore error object.
private wrapUpdateFunctionError(error: AnyJs): AnyJs {
return error;
}
/**
* Takes an updateFunction in which a set of reads and writes can be performed
* atomically. In the updateFunction, the client can read and write values
* using the supplied transaction object. After the updateFunction, all
* changes will be committed. If some other client has changed any of the data
* referenced, then the updateFunction will be called again. If the
* updateFunction still fails after the given number of retries, then the
* transaction will be rejection.
*
* The transaction object passed to the updateFunction contains methods for
* accessing documents and collections. Unlike other datastore access, data
* accessed with the transaction will not reflect local changes that have not
* been committed. For this reason, it is required that all reads are
* performed before any writes. Transactions must be performed while online.
*
* The promise returned is resolved when the transaction is fully committed.
*/
runTransaction<T>(
updateFunction: (transaction: Transaction) => Promise<T>,
retries = 5
): Promise<T> {
assert(retries >= 0, 'Got negative number of retries for transaction.');
const transaction = this.remoteStore.createTransaction();
const wrappedUpdateFunction = () => {
try {
const userPromise = updateFunction(transaction);
if (
isNullOrUndefined(userPromise) ||
!userPromise.catch ||
!userPromise.then
) {
return Promise.reject<T>(
Error('Transaction callback must return a Promise')
);
}
return userPromise.catch(e => {
return Promise.reject<T>(this.wrapUpdateFunctionError(e));
});
} catch (e) {
return Promise.reject<T>(this.wrapUpdateFunctionError(e));
}
};
return wrappedUpdateFunction().then(result => {
return transaction
.commit()
.then(() => {
return result;
})
.catch(error => {
if (retries === 0) {
return Promise.reject<T>(error);
}
// TODO(klimt): Put in a retry delay?
return this.runTransaction(updateFunction, retries - 1);
});
});
}
applyRemoteEvent(remoteEvent: RemoteEvent): Promise<void> {
this.assertSubscribed('applyRemoteEvent()');
// Make sure limbo documents are deleted if there were no results
objUtils.forEachNumber(
remoteEvent.targetChanges,
(targetId, targetChange) => {
const limboKey = this.limboKeysByTarget[targetId];
if (
limboKey &&
targetChange.currentStatusUpdate ===
CurrentStatusUpdate.MarkCurrent &&
!remoteEvent.documentUpdates.get(limboKey)
) {
// When listening to a query the server responds with a snapshot
// containing documents matching the query and a current marker
// telling us we're now in sync. It's possible for these to arrive
// as separate remote events or as a single remote event.
// For a document query, there will be no documents sent in the
// response if the document doesn't exist.
//
// If the snapshot arrives separately from the current marker,
// we handle it normally and updateTrackedLimbos will resolve the
// limbo status of the document, removing it from limboDocumentRefs.
// This works because clients only initiate limbo resolution when
// a target is current and because all current targets are
// always at a consistent snapshot.
//
// However, if the document doesn't exist and the current marker
// arrives, the document is not present in the snapshot and our
// normal view handling would consider the document to remain in
// limbo indefinitely because there are no updates to the document.
// To avoid this, we specially handle this just this case here:
// synthesizing a delete.
//
// TODO(dimond): Ideally we would have an explicit lookup query
// instead resulting in an explicit delete message and we could
// remove this special logic.
remoteEvent.addDocumentUpdate(
new NoDocument(limboKey, remoteEvent.snapshotVersion)
);
}
}
);
return this.localStore.applyRemoteEvent(remoteEvent).then(changes => {
return this.emitNewSnapsAndNotifyLocalStore(changes, remoteEvent);
});
}
/**
* Applies an OnlineState change to the sync engine and notifies any views of
* the change.
*/
applyOnlineStateChange(onlineState: OnlineState) {
const newViewSnapshots = [] as ViewSnapshot[];
this.queryViewsByQuery.forEach((query, queryView) => {
const viewChange = queryView.view.applyOnlineStateChange(onlineState);
assert(
viewChange.limboChanges.length === 0,
'OnlineState should not affect limbo documents.'
);
if (viewChange.snapshot) {
newViewSnapshots.push(viewChange.snapshot);
}
});
this.viewHandler(newViewSnapshots);
}
rejectListen(targetId: TargetId, err: FirestoreError): Promise<void> {
this.assertSubscribed('rejectListens()');
const limboKey = this.limboKeysByTarget[targetId];
if (limboKey) {
// Since this query failed, we won't want to manually unlisten to it.
// So go ahead and remove it from bookkeeping.
this.limboTargetsByKey = this.limboTargetsByKey.remove(limboKey);
delete this.limboKeysByTarget[targetId];
// TODO(klimt): We really only should do the following on permission
// denied errors, but we don't have the cause code here.
// It's a limbo doc. Create a synthetic event saying it was deleted.
// This is kind of a hack. Ideally, we would have a method in the local
// store to purge a document. However, it would be tricky to keep all of
// the local store's invariants with another method.
let docMap = new SortedMap<DocumentKey, MaybeDocument>(
DocumentKey.comparator
);
docMap = docMap.insert(
limboKey,
new NoDocument(limboKey, SnapshotVersion.forDeletedDoc())
);
const event = new RemoteEvent(SnapshotVersion.MIN, {}, docMap);
return this.applyRemoteEvent(event);
} else {
const queryView = this.queryViewsByTarget[targetId];
assert(!!queryView, 'Unknown targetId: ' + targetId);
return this.localStore.releaseQuery(queryView.query).then(() => {
return this.removeAndCleanupQuery(queryView).then(() => {
this.errorHandler!(queryView.query, err);
});
});
}
}
applySuccessfulWrite(
mutationBatchResult: MutationBatchResult
): Promise<void> {
this.assertSubscribed('applySuccessfulWrite()');
// The local store may or may not be able to apply the write result and
// raise events immediately (depending on whether the watcher is caught
// up), so we raise user callbacks first so that they consistently happen
// before listen events.
this.processUserCallback(
mutationBatchResult.batch.batchId,
/*error=*/ null
);
return this.localStore
.acknowledgeBatch(mutationBatchResult)
.then(changes => {
return this.emitNewSnapsAndNotifyLocalStore(changes);
});
}
rejectFailedWrite(batchId: BatchId, error: FirestoreError): Promise<void> {
this.assertSubscribed('rejectFailedWrite()');
// The local store may or may not be able to apply the write result and
// raise events immediately (depending on whether the watcher is caught up),
// so we raise user callbacks first so that they consistently happen before
// listen events.
this.processUserCallback(batchId, error);
return this.localStore.rejectBatch(batchId).then(changes => {
return this.emitNewSnapsAndNotifyLocalStore(changes);
});
}
private addMutationCallback(
batchId: BatchId,
callback: Deferred<void>
): void {
let newCallbacks = this.mutationUserCallbacks[this.currentUser.toKey()];
if (!newCallbacks) {
newCallbacks = new SortedMap<BatchId, Deferred<void>>(
primitiveComparator
);
}
newCallbacks = newCallbacks.insert(batchId, callback);
this.mutationUserCallbacks[this.currentUser.toKey()] = newCallbacks;
}
/**
* Resolves or rejects the user callback for the given batch and then discards
* it.
*/
private processUserCallback(batchId: BatchId, error: Error | null): void {
let newCallbacks = this.mutationUserCallbacks[this.currentUser.toKey()];
// NOTE: Mutations restored from persistence won't have callbacks, so it's
// okay for there to be no callback for this ID.
if (newCallbacks) {
const callback = newCallbacks.get(batchId);
Eif (callback) {
assert(
batchId === newCallbacks.minKey(),
'Mutation callbacks processed out-of-order?'
);
if (error) {
callback.reject(error);
} else {
callback.resolve();
}
newCallbacks = newCallbacks.remove(batchId);
}
this.mutationUserCallbacks[this.currentUser.toKey()] = newCallbacks;
}
}
private removeAndCleanupQuery(queryView: QueryView): Promise<void> {
this.queryViewsByQuery.delete(queryView.query);
delete this.queryViewsByTarget[queryView.targetId];
this.limboDocumentRefs.removeReferencesForId(queryView.targetId);
return this.gcLimboDocuments();
}
private updateTrackedLimbos(
targetId: TargetId,
limboChanges: LimboDocumentChange[]
): Promise<void> {
for (const limboChange of limboChanges) {
if (limboChange instanceof AddedLimboDocument) {
this.limboDocumentRefs.addReference(limboChange.key, targetId);
this.trackLimboChange(limboChange);
} else Eif (limboChange instanceof RemovedLimboDocument) {
log.debug(LOG_TAG, 'Document no longer in limbo: ' + limboChange.key);
this.limboDocumentRefs.removeReference(limboChange.key, targetId);
} else {
fail('Unknown limbo change: ' + JSON.stringify(limboChange));
}
}
return this.gcLimboDocuments();
}
private trackLimboChange(limboChange: AddedLimboDocument): void {
const key = limboChange.key;
Eif (!this.limboTargetsByKey.get(key)) {
log.debug(LOG_TAG, 'New document in limbo: ' + key);
const limboTargetId = this.targetIdGenerator.next();
const query = Query.atPath(key.path);
this.limboKeysByTarget[limboTargetId] = key;
this.remoteStore.listen(
new QueryData(query, limboTargetId, QueryPurpose.Listen)
);
this.limboTargetsByKey = this.limboTargetsByKey.insert(
key,
limboTargetId
);
}
}
private gcLimboDocuments(): Promise<void> {
// HACK: We can use a null transaction here, because we know that the
// reference set is entirely within memory and doesn't need a store engine.
return this.limboCollector
.collectGarbage(null)
.next(keys => {
keys.forEach(key => {
const limboTargetId = this.limboTargetsByKey.get(key);
if (limboTargetId === null) {
// This target already got removed, because the query failed.
return;
}
this.remoteStore.unlisten(limboTargetId);
this.limboTargetsByKey = this.limboTargetsByKey.remove(key);
delete this.limboKeysByTarget[limboTargetId];
});
})
.toPromise();
}
// Visible for testing
currentLimboDocs(): SortedMap<DocumentKey, TargetId> {
return this.limboTargetsByKey;
}
private emitNewSnapsAndNotifyLocalStore(
changes: MaybeDocumentMap,
remoteEvent?: RemoteEvent
): Promise<void> {
const newSnaps: ViewSnapshot[] = [];
const docChangesInAllViews: LocalViewChanges[] = [];
const queriesProcessed: Array<Promise<void>> = [];
this.queryViewsByQuery.forEach((_, queryView) => {
queriesProcessed.push(
Promise.resolve()
.then(() => {
const viewDocChanges = queryView.view.computeDocChanges(changes);
if (!viewDocChanges.needsRefill) {
return viewDocChanges;
}
// The query has a limit and some docs were removed, so we need
// to re-run the query against the local store to make sure we
// didn't lose any good docs that had been past the limit.
return this.localStore.executeQuery(queryView.query).then(docs => {
return queryView.view.computeDocChanges(docs, viewDocChanges);
});
})
.then((viewDocChanges: ViewDocumentChanges) => {
const targetChange =
remoteEvent && remoteEvent.targetChanges[queryView.targetId];
const viewChange = queryView.view.applyChanges(
viewDocChanges,
targetChange
);
return this.updateTrackedLimbos(
queryView.targetId,
viewChange.limboChanges
).then(() => {
if (viewChange.snapshot) {
newSnaps.push(viewChange.snapshot);
const docChanges = LocalViewChanges.fromSnapshot(
viewChange.snapshot
);
docChangesInAllViews.push(docChanges);
}
});
})
);
});
return Promise.all(queriesProcessed)
.then(() => {
this.viewHandler!(newSnaps);
return this.localStore.notifyLocalViewChanges(docChangesInAllViews);
})
.then(() => {
return this.localStore.collectGarbage();
});
}
private assertSubscribed(fnName: string): void {
assert(
this.viewHandler !== null && this.errorHandler !== null,
'Trying to call ' + fnName + ' before calling subscribe().'
);
}
handleUserChange(user: User): Promise<void> {
this.currentUser = user;
return this.localStore
.handleUserChange(user)
.then(changes => {
return this.emitNewSnapsAndNotifyLocalStore(changes);
})
.then(() => {
return this.remoteStore.handleUserChange(user);
});
}
}
|